{
  "cells": [
    {
      "cell_type": "markdown",
      "id": "diverse-space",
      "metadata": {
        "id": "diverse-space"
      },
      "source": [
        "# High Level Python API\n",
        "\n",
        "<a href=\"https://colab.research.google.com/github/lwa-project/bifrost/blob/master/tutorial/07_high_level_api.ipynb\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open in Colab\"></a>\n",
        "\n",
        "Up until now we have been using the low level Python API that Bifrost has to show the inner workings of the framework and how to build a pipeline.  However, Bifrost also has a high level Python API that makes building blocks and pipelines easier with less code.  In this section we will look at this interface.\n",
        "\n",
        "The general idea of the high-level API is to allow a pipeline to be built out of processing blocks that are connected together like Lego. For example, here is a pipeline that reads from a sigproc file, performs a fast dispersion measure transform, and then writes out the data to disk:\n",
        "\n",
        "```python\n",
        "import bifrost as bf\n",
        "import sys\n",
        "\n",
        "filenames = sys.argv[1:]\n",
        "\n",
        "print(\"Building pipeline\")\n",
        "data = bf.blocks.read_sigproc(filenames, gulp_nframe=128)\n",
        "data = bf.blocks.copy(data, 'cuda')\n",
        "data = bf.blocks.transpose(data, ['pol', 'freq', 'time'])\n",
        "data = bf.blocks.fdmt(data, max_dm=100.)\n",
        "data = bf.blocks.copy(data, 'cuda_host')\n",
        "bf.blocks.write_sigproc(data)\n",
        "\n",
        "print(\"Running pipeline\")\n",
        "bf.get_default_pipeline().run()\n",
        "print(\"All done\")\n",
        "```\n",
        "\n",
        "Here we are continually passing the block's output, `data`, to the next block in the pipeline. At runtime, the blocks are connected together (as a directed graph) with ring buffers in between.\n"
      ]
    },
    {
      "cell_type": "code",
      "source": [
        "%%capture install_log\n",
        "# Import bifrost, but attempt to auto-install if needed (and we're running on\n",
        "# Colab). If something goes wrong, evaluate install_log.show() in a new block\n",
        "# to retrieve the details.\n",
        "try:\n",
        "  import bifrost\n",
        "except ModuleNotFoundError as exn:\n",
        "  try:\n",
        "    import google.colab\n",
        "  except ModuleNotFoundError:\n",
        "    raise exn\n",
        "  !sudo apt-get -qq install universal-ctags libopenblas-dev software-properties-common build-essential\n",
        "  !pip install -q contextlib2 pint simplejson scipy git+https://github.com/ctypesgen/ctypesgen.git\n",
        "  ![ -d ~/bifrost/.git ] || git clone https://github.com/lwa-project/bifrost ~/bifrost\n",
        "  !(cd ~/bifrost && ./configure --disable-cuda && make -j all && sudo make install)\n",
        "  import bifrost"
      ],
      "metadata": {
        "id": "kIQelikFM5mZ"
      },
      "id": "kIQelikFM5mZ",
      "execution_count": 1,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "\n",
        "### Creating a transform block\n",
        "\n",
        "Many common processing operations have bifrost blocks, but it likely that you will need to make a custom block at some stage. To see how, let's start by revisiting the `CopyOp` block from the pipelines section:"
      ],
      "metadata": {
        "id": "obSTqcFKM4WV"
      },
      "id": "obSTqcFKM4WV"
    },
    {
      "cell_type": "code",
      "execution_count": 2,
      "id": "black-cruise",
      "metadata": {
        "id": "black-cruise"
      },
      "outputs": [],
      "source": [
        "class CopyOp(object):\n",
        "    def __init__(self, iring, oring, ntime_gulp=250, guarantee=True, core=-1):\n",
        "        self.iring = iring\n",
        "        self.oring = oring\n",
        "        self.ntime_gulp = ntime_gulp\n",
        "        self.guarantee = guarantee\n",
        "        self.core = core\n",
        "        \n",
        "    def main(self):\n",
        "        with self.oring.begin_writing() as oring:\n",
        "            for iseq in self.iring.read(guarantee=self.guarantee):\n",
        "                ihdr = json.loads(iseq.header.tostring())\n",
        "                \n",
        "                print(\"Copy: Start of new sequence:\", str(ihdr))\n",
        "                \n",
        "                time_tag = ihdr['time_tag']\n",
        "                navg     = ihdr['navg']\n",
        "                nbeam    = ihdr['nbeam']\n",
        "                chan0    = ihdr['chan0']\n",
        "                nchan    = ihdr['nchan']\n",
        "                chan_bw  = ihdr['bw'] / nchan\n",
        "                npol     = ihdr['npol']\n",
        "                pols     = ihdr['pols']\n",
        "                pols     = pols.replace('CR', 'XY_real')\n",
        "                pols     = pols.replace('CI', 'XY_imag')\n",
        "\n",
        "                igulp_size = self.ntime_gulp*nbeam*nchan*npol*4        # float32\n",
        "                ishape = (self.ntime_gulp,nbeam,nchan,npol)\n",
        "                self.iring.resize(igulp_size, igulp_size*5)\n",
        "                \n",
        "                ogulp_size = igulp_size\n",
        "                oshape = ishape\n",
        "                self.oring.resize(ogulp_size)\n",
        "                \n",
        "                ohdr = ihdr.copy()\n",
        "                ohdr_str = json.dumps(ohdr)\n",
        "                \n",
        "                iseq_spans = iseq.read(igulp_size)\n",
        "                with oring.begin_sequence(time_tag=time_tag, header=ohdr_str) as oseq:\n",
        "                    for ispan in iseq_spans:\n",
        "                        if ispan.size < igulp_size:\n",
        "                            continue # Ignore final gulp\n",
        "                            \n",
        "                        with oseq.reserve(ogulp_size) as ospan:\n",
        "                            idata = ispan.data_view(numpy.float32)\n",
        "                            odata = ospan.data_view(numpy.float32)    \n",
        "                            odata[...] = idata"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "fiscal-coverage",
      "metadata": {
        "id": "fiscal-coverage"
      },
      "source": [
        "There is a lot of setup in here and iteration control that is common across many of the blocks that we have looked at.  In the high level API much of this can be abstracted away using the classes defined in `bifrost.pipelines`:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 3,
      "id": "intelligent-retailer",
      "metadata": {
        "id": "intelligent-retailer"
      },
      "outputs": [],
      "source": [
        "import copy\n",
        "\n",
        "from bifrost import pipeline\n",
        "\n",
        "class NewCopyOp(pipeline.TransformBlock):\n",
        "    def __init__(self, iring, *args, **kwargs):\n",
        "        super(NewCopyOp, self).__init__(iring, *args, **kwargs)\n",
        "        \n",
        "    def on_sequence(self, iseq):\n",
        "        ihdr = iseq.header\n",
        "        print(\"Copy: Start of new sequence:\", str(ihdr))\n",
        "        \n",
        "        ohdr = copy.deepcopy(iseq.header)\n",
        "        return ohdr\n",
        "\n",
        "    def on_data(self, ispan, ospan):\n",
        "        in_nframe  = ispan.nframe\n",
        "        out_nframe = in_nframe\n",
        "\n",
        "        idata = ispan.data\n",
        "        odata = ospan.data\n",
        "\n",
        "        odata[...] = idata\n",
        "        return out_nframe"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "unique-basement",
      "metadata": {
        "id": "unique-basement"
      },
      "source": [
        "That is much more compact.  The key things in this new class are:\n",
        " 1. the `on_sequence` method is called whenever a new sequence starts and is used to update the header for the output ring buffer and\n",
        " 2. the `on_data` method is called for each span/gulp that is processed.\n",
        "\n",
        "Put another way, the `on_sequence` happens when there is new *metadata*, and `on_data` happens whenever there is new *data* to process. For example, `on_sequence` may be called when reading a new file, or starting an observation.\n",
        " \n"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "communist-worse",
      "metadata": {
        "id": "communist-worse"
      },
      "source": [
        "### Creating a source block\n",
        "\n",
        " Similarly, we can translate the `GeneratorOp` and `WriterOp` blocks as well using sub-classes of `bifrost.pipeline.SourceBlock` and `bifrost.pipeline.SinkBlock`, respectively:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 4,
      "id": "grand-central",
      "metadata": {
        "id": "grand-central"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "import time\n",
        "import numpy\n",
        "\n",
        "class NewGeneratorOp(pipeline.SourceBlock):\n",
        "    def __init__(self, ntime_gulp, *args, **kwargs):\n",
        "        super(NewGeneratorOp, self).__init__(['generator',], 1,\n",
        "                                             *args, **kwargs)\n",
        "        \n",
        "        self.ntime_gulp = ntime_gulp\n",
        "        self.ngulp_done = 0\n",
        "        self.ngulp_max = 10\n",
        "        \n",
        "        self.navg = 24\n",
        "        tint = self.navg / 25e3\n",
        "        self.tgulp = tint * self.ntime_gulp\n",
        "        self.nbeam = 1\n",
        "        self.chan0 = 1234\n",
        "        self.nchan = 16*184\n",
        "        self.npol = 4\n",
        "        \n",
        "    def create_reader(self, name):\n",
        "        self.ngulp_done = 0\n",
        "        \n",
        "        class Random(object):\n",
        "            def __init__(self, name):\n",
        "                self.name = name\n",
        "            def __enter__(self):\n",
        "                return self\n",
        "            def __exit__(self, type, value, tb):\n",
        "                return True\n",
        "            def read(self, *args):\n",
        "                return numpy.random.randn(*args)\n",
        "                \n",
        "        return Random(name)\n",
        "        \n",
        "    def on_sequence(self, reader, name):\n",
        "        ohdr = {'time_tag': int(int(time.time())*196e6),\n",
        "                'seq0':     0, \n",
        "                'chan0':    self.chan0,\n",
        "                'cfreq0':   self.chan0*25e3,\n",
        "                'bw':       self.nchan*25e3,\n",
        "                'navg':     self.navg,\n",
        "                'nbeam':    self.nbeam,\n",
        "                'nchan':    self.nchan,\n",
        "                'npol':     self.npol,\n",
        "                'pols':     'XX,YY,CR,CI',\n",
        "               }\n",
        "        ohdr['_tensor'] = {'dtype':  'f32',\n",
        "                           'shape':  [-1,\n",
        "                                      self.ntime_gulp,\n",
        "                                      self.nbeam,\n",
        "                                      self.nchan,\n",
        "                                      self.npol]\n",
        "                          }\n",
        "        return [ohdr,]\n",
        "        \n",
        "    def on_data(self, reader, ospans):\n",
        "        indata = reader.read(self.ntime_gulp, self.nbeam, self.nchan, self.npol)\n",
        "        time.sleep(self.tgulp)\n",
        "\n",
        "        if indata.shape[0] == self.ntime_gulp \\\n",
        "           and self.ngulp_done < self.ngulp_max:\n",
        "            ospans[0].data[...] = indata\n",
        "            self.ngulp_done += 1\n",
        "            return [1]\n",
        "        else:\n",
        "            return [0]                    "
      ]
    },
    {
      "cell_type": "markdown",
      "id": "essential-burst",
      "metadata": {
        "id": "essential-burst"
      },
      "source": [
        "For the `bifrost.pipeline.SourceBlock` we need have slightly different requirements on `on_sequence` and `on_data`.  Plus, we also need to define a `create_reader` method that returns a context manager (a class with `__enter__` and `__exit__` methods).  \n",
        "\n",
        "For `on_sequence` we need to accept two arguments: a context manager created by `create_reader` and an identifying name (although it is not used here).  \n",
        "\n",
        "For `on_data` we also have two arguments now, the context manager and a list of output spans.  In here we need to grab the data from `reader` and put it into the appropriate part of the output spans.\n",
        "\n",
        "### The all-important bifrost `_tensor` dictionary\n",
        "\n",
        "We also see in `on_sequence` here that the header dictionary has a new required `_tensor` key.  This key is the key to automatically chaining blocks together into a pipeline since it defines the data type and dimensionality for the spans/gulps. At a minimum, the `_tensor` must define the data dtype and shape:\n",
        "\n",
        "```python\n",
        "    '_tensor': {\n",
        "            'dtype':  self.dtype,\n",
        "            'shape':  [-1, self.gulp_size],\n",
        "            },\n",
        "```\n",
        "\n",
        "The first index of shape is -1, to indicate that this is a single gulp from the data stream. \n",
        "\n",
        "However, most block require three additional keywords: \n",
        "* `labels`, which give human-friendly names to each axis (e.g. 'time' or 'frequency').\n",
        "* `scales`, which defines the start value and step size for each axis (e.g. `[1420, 0.1]` sets start value 1420, step size 0.1).\n",
        "* `units`, which defines the units for each axis (e.g. 's' or 'MHz'). These are parsed using [Pint](https://pint.readthedocs.io/en/stable/), and can be used for consistency checking. For example, the `bf.views.merge_axes` view won't allow axes to merge if they have different units, say 'MHz' and 'Jy'. If you attempted to merge axes with 'MHz' and 'kHz' units, this would be allowed, and the corresponding `scales` are updated consistently.\n",
        "\n",
        "Here is another example `_tensor` with all keywords:\n",
        "\n",
        "```python\n",
        "    t0 = 1620192408.005579  # unix timestamp from when writing this tutorial\n",
        "    dt = 0.1                # 0.1 second step size\n",
        "    '_tensor': {\n",
        "            'dtype':  'cf32'\n",
        "            'shape':  [-1, 1024, 4],\n",
        "            'labels': ['time', 'frequency', 'pol'],\n",
        "            'units':  ['s', 'MHz', ''],\n",
        "            'scales': [[t0, dt], [1420, 0.1], [None, None]]\n",
        "    }\n",
        "```\n",
        "\n",
        "A transform block reads the tensor metadata, and must output a copy of the tensor with any required changes to shape/scale/units etc. \n",
        "\n",
        "### Creating a sink block\n",
        "\n",
        "We can also translate the original `WriterOp`:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 5,
      "id": "hispanic-toddler",
      "metadata": {
        "id": "hispanic-toddler"
      },
      "outputs": [],
      "source": [
        "class NewWriterOp(pipeline.SinkBlock):\n",
        "    def __init__(self, iring, *args, **kwargs):\n",
        "        super(NewWriterOp, self).__init__(iring, *args, **kwargs)\n",
        "        \n",
        "        self.time_tag = None\n",
        "        self.navg = 0\n",
        "        self.nbeam = 0\n",
        "        self.nchan = 0\n",
        "        self.npol = 0\n",
        "        \n",
        "    def on_sequence(self, iseq):\n",
        "        ihdr = iseq.header\n",
        "        print(\"Writer: Start of new sequence:\", str(ihdr))\n",
        "        \n",
        "        self.time_tag = iseq.time_tag\n",
        "        self.navg = ihdr['navg']\n",
        "        self.nbeam = ihdr['nbeam']\n",
        "        self.nchan = ihdr['nchan']\n",
        "        self.npol = ihdr['npol']\n",
        "\n",
        "    def on_data(self, ispan):\n",
        "        idata = ispan.data.view(numpy.float32)\n",
        "        idata = idata.reshape(-1, self.nbeam, self.nchan, self.npol)\n",
        "        \n",
        "        with open(f\"{self.time_tag}.dat\", 'wb') as fh:\n",
        "            fh.write(idata.tobytes())\n",
        "            print('  ', fh.name, '@', os.path.getsize(fh.name))\n",
        "        self.time_tag += self.navg * idata.shape[0] * (int(196e6) // int(25e3))"
      ]
    },
    {
      "cell_type": "markdown",
      "id": "arbitrary-dynamics",
      "metadata": {
        "id": "arbitrary-dynamics"
      },
      "source": [
        "Since this is a data sink we only have one argument for `on_data` which gives the block the current data span/gulp.\n",
        "\n",
        "We then can put these new blocks all together and launch them under Bifrost's default pipeline with:"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 6,
      "id": "objective-principal",
      "metadata": {
        "id": "objective-principal",
        "outputId": "b4c6067b-2b11-49c2-c5ab-490d6bc01a78",
        "colab": {
          "base_uri": "https://localhost:8080/"
        }
      },
      "outputs": [
        {
          "output_type": "stream",
          "name": "stdout",
          "text": [
            "Copy: Start of new sequence: {'time_tag': 324661240792000000, 'seq0': 0, 'chan0': 1234, 'cfreq0': 30850000.0, 'bw': 73600000.0, 'navg': 24, 'nbeam': 1, 'nchan': 2944, 'npol': 4, 'pols': 'XX,YY,CR,CI', '_tensor': {'dtype': 'f32', 'shape': [-1, 250, 1, 2944, 4]}, 'name': 'unnamed-sequence-0', 'gulp_nframe': 1}\n",
            "Writer: Start of new sequence: {'time_tag': 324661240792000000, 'seq0': 0, 'chan0': 1234, 'cfreq0': 30850000.0, 'bw': 73600000.0, 'navg': 24, 'nbeam': 1, 'nchan': 2944, 'npol': 4, 'pols': 'XX,YY,CR,CI', '_tensor': {'dtype': 'f32', 'shape': [-1, 250, 1, 2944, 4]}, 'name': 'unnamed-sequence-0', 'gulp_nframe': 1}\n",
            "   324661240792000000.dat @ 11776000\n",
            "   324661240839040000.dat @ 11776000\n",
            "   324661240886080000.dat @ 11776000\n",
            "   324661240933120000.dat @ 11776000\n",
            "   324661240980160000.dat @ 11776000\n",
            "   324661241027200000.dat @ 11776000\n",
            "   324661241074240000.dat @ 11776000\n",
            "   324661241121280000.dat @ 11776000\n",
            "   324661241168320000.dat @ 11776000\n",
            "   324661241215360000.dat @ 11776000\n"
          ]
        }
      ],
      "source": [
        "b_gen = NewGeneratorOp(250)\n",
        "b_cpy = NewCopyOp(b_gen)\n",
        "b_out = NewWriterOp(b_cpy)\n",
        "\n",
        "p =  pipeline.get_default_pipeline()\n",
        "p.run()\n",
        "del p"
      ]
    }
  ],
  "metadata": {
    "kernelspec": {
      "display_name": "Python 3",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "codemirror_mode": {
        "name": "ipython",
        "version": 3
      },
      "file_extension": ".py",
      "mimetype": "text/x-python",
      "name": "python",
      "nbconvert_exporter": "python",
      "pygments_lexer": "ipython3",
      "version": "3.7.6"
    },
    "colab": {
      "name": "07_high_level_api.ipynb",
      "provenance": []
    }
  },
  "nbformat": 4,
  "nbformat_minor": 5
}
